package com.bawei.persona.realtime.app.func;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import com.bawei.persona.realtime.bean.TableProcess;
import com.bawei.persona.realtime.common.GmallConfig;
import com.bawei.persona.realtime.util.HBaseUtil;
import com.bawei.persona.realtime.util.MySQLUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;

import java.util.ArrayList;
import java.util.List;


/**
 * 上海大数据学院
 * 项目规划及管理：李剑
 * 技术指导及需求分析：郭洵
 * 编程：楚志高
 *
 * @author bawei  bigdata sh
 * @since 2021-06-11
 */
public class DimHbaseSink  extends RichSinkFunction<JSONObject> {
    public DimHbaseSink() {
    }

    public DimHbaseSink(Integer maxSize, Long delayTime) {
        this.maxSize = maxSize;
        this.delayTime = delayTime;
    }

    private transient Integer maxSize = 1000;
    private transient Long delayTime = 5000L;

    private transient Connection connection;
    private transient Long lastInvokeTime;
    private transient List<Put> puts = new ArrayList<>(maxSize);



    // 打开hbase 连接
    @Override
    public void open(Configuration parameters) throws Exception {

        super.open(parameters);

        // 获取全局配置文件，并转为ParameterTool
        //创建一个Hbase的连接
        connection = HBaseUtil.getConnection(
                GmallConfig.HBASE_ZOOKEEPER_QUORUM ,
                2181);

        // 获取系统当前时间
        lastInvokeTime = System.currentTimeMillis();

    }

    // 写入数据
    @Override
    public void invoke(JSONObject value, Context context) throws Exception {
        /*

{
    "database":"gmall2021",
    "table":"base_category1",
    "type":"insert",
    "ts":1624295990,
    "xid":603819,
    "xoffset":369120,
    "data":{
        "id":1407025480462028812,
        "name":"zhigao"
    }
}


         */
        //得到需要的原始数据 转化为object
        //获取到数据中的表与类型。将来能在数据中的HBASE字段的获取
        String table1 = value.getString("table");
        String type = value.getString("type");
        String data1 = value.getString("data");
        //获取其中的data 真实的数据是其中的表数据
        JSONObject data = JSON.parseObject(data1);

        //在data 中获取到对应的主键值
        String rk = data.getString("id");

        //创建put对象，并赋rk值
        Put put = new Put(rk.getBytes());
        // 添加值：f1->列族, order->属性名 如age， 第三个->属性值 如25
        //根据需要得到的键值进行切分后得到所需要的key 值对
        //从数据库中得到需要的配置字段
        // 'base_province' and operate_type='insert' source_table='base_province' and operate_type='insert'
        List<TableProcess> tableProcessList = MySQLUtil.queryList("select * from table_process where  source_table ='" + table1 + "' and operate_type='" + type + "'" , TableProcess.class, true);
        TableProcess tableProcess = tableProcessList.get(0);
        //获取到要保存的表的字段
        String sinkcolumn = tableProcess.getSinkColumns();
        String[] split = sinkcolumn.split(",");
        for (String column:
             split ) {
            //根据得到的要保存到HBASE的字段去获取保存在DATA中的数据
            String valueStr = data.getString(column);
            //将数据保存
            put.addColumn("f1".getBytes(), column.getBytes(), valueStr.getBytes());
        }

        puts.add(put);// 添加put对象到list集合
        //使用ProcessingTime
        long currentTime = System.currentTimeMillis();
        //开始批次提交数据
        if (puts.size() == maxSize || currentTime - lastInvokeTime >= delayTime) {
            //获取一个Hbase表
            String habseSchema = GmallConfig.HABSE_SCHEMA;
            String tablename = habseSchema + "." + tableProcess.getSinkTable();
            Table table = connection.getTable(TableName.valueOf( tablename));
            table.put(puts);//批次提交
            puts.clear();
            lastInvokeTime = currentTime;
            table.close();
        }

    }

    //关闭连接
    @Override
    public void close() throws Exception {
        super.close();
        connection.close();

    }
}
